Skip to content

Conversation

@Fokko
Copy link
Collaborator

@Fokko Fokko commented Oct 10, 2025

What changes are proposed in this pull request?

Hey everyone, this is a first PR to start the discussion around writing Parquet files.

Currently, the way to write Parquet is to completely delegate this to the engine, for example here: https://github.com/dl-rs-private/delta-kernel-rs/blob/a096d013f876ed29beef9379cf4cd713e9febd90/kernel/src/checkpoint/mod.rs#L44

Some things to consider:

  • In the DefaultParquetHandler there is write_parquet:
    // Write `data` to `{path}/<uuid>.parquet` as parquet using ArrowWriter and return the parquet
    // metadata (where `<uuid>` is a generated UUIDv4).
    //
    // Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in
    // order to obtain metadata about the object just written.
    async fn write_parquet(
    &self,
    path: &url::Url,
    data: Box<dyn EngineData>,
    ) -> DeltaResult<DataFileMetadata> {

    But this one is very much focussed on writing DataFiles. This is not something we really need if we want to generic Parquet (for example a checkpoint).
  • I've started with () as a return type so we can extend that later on. We could also return things like the size, but that would introduce another HEAD request, which we need to consider if that's something we really need.
  • Now it writes everything into a single Parquet file. We could also make it more fancy and have a ParquetWriter that that consumes batches of EngineData. For the snapshot, this is not a requirement.

Resolves #1376

This PR affects the following public APIs

Introduces a new public API, and extends an existing trait.

How was this change tested?

@Fokko Fokko added the breaking-change Change that require a major version bump label Oct 10, 2025
@codecov
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 93.61022% with 20 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.75%. Comparing base (2ec1462) to head (904c01e).

Files with missing lines Patch % Lines
kernel/src/engine/sync/parquet.rs 93.87% 0 Missing and 9 partials ⚠️
kernel/src/engine/default/parquet.rs 95.30% 0 Missing and 7 partials ⚠️
kernel/src/engine/arrow_utils.rs 76.47% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1392      +/-   ##
==========================================
+ Coverage   84.65%   84.75%   +0.09%     
==========================================
  Files         115      115              
  Lines       29557    29858     +301     
  Branches    29557    29858     +301     
==========================================
+ Hits        25021    25305     +284     
  Misses       3329     3329              
- Partials     1207     1224      +17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

/// # Returns
///
/// A [`DeltaResult`] indicating success or failure.
fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for the API to take in write options: e.g. compression, row group size etc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compression would make sense to me, but row group size is often more complex. Some writers take the number of rows, while others take the size in bytes. Instead, we can also let the engine decide on this?

/// # Returns
///
/// A [`DeltaResult`] indicating success or failure.
fn write_parquet_file(&self, url: url::Url, data: Box<dyn EngineData>) -> DeltaResult<()>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't data be iterator of FilteredEngineData, because this is what checkpoint producer produces.

Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging a bit more into the code, I think this makes sense. Having the writer do filtering was not directly obvious to me, but it looks like we are also delegating that to the engine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it avoids a copy in cases where the kernel has to filter out some rows. Also consistent with the existing JSON write API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I think it would be nice to have the convenience From trait to convert EngineData into FilteredEngineData: #1397

read_files(files, schema, predicate, try_create_from_parquet)
}

fn write_parquet_file(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the machinery in this and the default client looks the same, just rip these out into a pub(crate) fn so they use the same logic. Or was there is a reason they are separate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are different, and it would be good to keep them separate:

  • SyncParquetHandler writes directly to a file, which makes sense since it only supports local FS.
  • DefaultParquetHandler buffers first the memory in memory, and then pushes everything directly to the object store.

They look pretty similar today, but I think they might diverge more in the future when we start doing more optimizations.

@Fokko Fokko closed this Oct 14, 2025
@Fokko Fokko deleted the fd-write-parquet branch October 14, 2025 20:40
@Fokko Fokko restored the fd-write-parquet branch October 14, 2025 21:29
@Fokko Fokko reopened this Oct 14, 2025
@nicklan nicklan self-requested a review October 14, 2025 23:17
fn write_parquet_file(
&self,
url: url::Url,
data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just data: Box<dyn EngineData>?

  1. Why Iterator?
  2. Why FilteredEngineData?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why Iterator

For future proving, you could write chunks of data that are larger than the memory. By having an iterator, you can stream this into Parquet file. Arrow has a similar concept where you have ChunkedArray, where each chunk is a row-group. I think we want to mimic that a bit here.

  1. Why FilteredEngineData

This was also not my first thought (see #1392 (comment)). But this nicely aligns with the JSON API. To make the syntax a bit more friendly, and reduce the visual noise, I've suggested implementing the From trait to easily convert EngineData into FilteredEngineData: #1397.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but I think we still need to go through all batches to identify the final schema. We're not guaranteed that it will be the same for all actions. Also, I believe you are joining everything to the memory object here: https://github.com/delta-io/delta-kernel-rs/pull/1392/files#diff-e05e7b3b94c5637bfc367192986135a7a8a3986c34dc1b22cfd4961647ce7664R64, so we still haven't addressed the potential problem of "it might not fit into memory".

With FilteredEngineData, I see we can use "selection_vector" – this is a good feature, I agree. Are we aware of cases when we can use it currently?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With FilteredEngineData, I see we can use "selection_vector" – this is a good feature, I agree. Are we aware of cases when we can use it currently?

We use it in the proposed remove_files PR (#1390)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. For JSON the schema can change per row, but this can't be the case for Parquet. I've updated the code to remove the iterator for now. @anoopj WDYT?

@Fokko Fokko requested a review from emkornfield October 16, 2025 14:00
.try_collect()
.unwrap();

assert_eq!(data.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need verify that field-id's are populated and we can project based on field IDs for column indirection? Will this be a follow-up?

@gotocoding-DB
Copy link
Contributor

I'm not delta-kernel-rs maintainer, but from my POV this PR looks good.
P.S. Thanks for doing this, Fokko! We need this to simplify checkpoints.

predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator>;

/// Write data to a Parquet file at the specified URL.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should specify the semantics around what to do if the file already exists

// Convert FilteredEngineData to RecordBatch, applying selection filter
let batch = filter_to_record_batch(data)?;

// We buffer it in the application first, and then push everything to the object-store.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the async_writer

As they note on that page: object_store provides it’s native implementation of AsyncFileWriter by ParquetObjectWriter.

So you could do something like:

let path = Path::from_url_path(location.path())?;
let object_writer = ParquetObjectWriter::new(self.store.clone(), path);
let mut writer = AsyncArrowWriter::try_new(
    object_writer,
    batch.schema(),
    None, // could be some props if needed
)?;

// Block on the async write
self.task_executor
  .block_on(async move { writer.write(&batch).await })?; 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking-change Change that require a major version bump

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce write_parquet_file to ParquetHandler

6 participants